package com.fwmagic.dynamic_rule.service;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.RuleAtomicParam;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.impl.*;
import com.fwmagic.dynamic_rule.utils.SystemPrintUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.state.ListState;
import scala.annotation.meta.param;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;

/**
 * 查询路由服务
 */
public class QueryRouterServiceV3 {

    private UserProfileQueryService userProfileQueryService;

    private UserActionCountQueryService userActionCountQueryService;

    private UserActionSequenceQueryService userActionSequenceQueryService;

    private UserActionCountQueryService userActionCountQueryClickHouseService;

    private UserActionSequenceQueryService userActionSequenceQueryClickHouseService;

    public QueryRouterServiceV3() throws Exception {
        //构造底层核心的查询服务State
        userProfileQueryService = new UserProfileQueryServiceHbaseImpl();
        userActionCountQueryService = new UserActionCountQueryServiceStateImpl();
        userActionSequenceQueryService = new UserActionSequenceQueryServiceStateImpl();

        //ClickHouse
        userActionCountQueryClickHouseService = new UserActionCountQueryServiceClickHouseImpl();
        userActionSequenceQueryClickHouseService = new UserActionSequenceQueryServiceClickHouseImpl();
    }

    //控制画像条件查询路由

    /**
     * 查询画像条件
     *
     * @param logBean
     * @param ruleParam
     * @return
     */
    public boolean userProfileQuery(LogBean logBean, RuleParam ruleParam) {
        boolean userProfileMatch = userProfileQueryService.judgeProfileCondition(logBean.getDeviceId(), ruleParam);
        return userProfileMatch;
    }

    //控制count类条件查询路由

    /**
     * 条件次数查询
     *
     * @param logBean
     * @param listState
     * @param ruleParam
     * @return
     */
    public boolean userActionCountQuery(LogBean logBean, ListState<LogBean> listState, RuleParam ruleParam) throws Exception {
        //只查近期的条件组
        ArrayList<RuleAtomicParam> nearRuleAtomicParams = new ArrayList<>();
        //只查远期的条件组
        ArrayList<RuleAtomicParam> farRuleAtomicParams = new ArrayList<>();
        //查跨时间分界点的条件组
        ArrayList<RuleAtomicParam> crossoverRuleAtomicParams = new ArrayList<>();

        //计算查询分界点：当前时间对小时取整 -1
        Long splitPoint = DateUtils.addHours(DateUtils.ceiling(new Date(), Calendar.HOUR), -2).getTime();

        List<RuleAtomicParam> userActionCountParam = ruleParam.getUserActionCountParam();
        if (CollectionUtils.isEmpty(userActionCountParam)) return true;

        for (RuleAtomicParam param : userActionCountParam) {
            if (param.getRangeEnd() < splitPoint) {
                farRuleAtomicParams.add(param);
            } else if (param.getRangeStart() >= splitPoint) {
                nearRuleAtomicParams.add(param);
            } else {
                crossoverRuleAtomicParams.add(param);
            }
        }

        if (CollectionUtils.isNotEmpty(nearRuleAtomicParams)) {
            SystemPrintUtils.printLog("查询Flink的State数据");
            boolean userActionCountMatch = userActionCountQueryService.queryActionCounts("", ruleParam);
            if (!userActionCountMatch) return false;
        }

        if (CollectionUtils.isNotEmpty(farRuleAtomicParams)) {
            SystemPrintUtils.printLog("查询ClickHouse的数据");
            boolean userActionCountMatch = userActionCountQueryClickHouseService.queryActionCounts(logBean.getDeviceId(), ruleParam);
            if (!userActionCountMatch) return false;
        }

        if (CollectionUtils.isNotEmpty(crossoverRuleAtomicParams)) {
            SystemPrintUtils.printLog("查询Flink的State和ClickHouse的数据");
            for (RuleAtomicParam ruleAtomicParam : crossoverRuleAtomicParams) {
                long rangeStart = ruleAtomicParam.getRangeStart();
                long rangeEnd = ruleAtomicParam.getRangeEnd();
                //先查询State
                ruleAtomicParam.setRangeStart(splitPoint);
                boolean b = userActionCountQueryService.queryActionCounts("", ruleAtomicParam);
                if (b) continue;

                //不满足，查询ClickHouse
                ruleAtomicParam.setRangeStart(rangeStart);
                ruleAtomicParam.setRangeEnd(splitPoint);
                //再查ClickHouse
                boolean b1 = userActionCountQueryClickHouseService.queryActionCounts(logBean.getDeviceId(), ruleAtomicParam);
                if (!b1) return false;
            }
        }
        return true;
    }

    //控制sequence类条件查询路由
    public boolean userActionSequenceQuery(LogBean logBean, ListState<LogBean> listState, RuleParam ruleParam) throws Exception {
        //取出规则中的序列条件
        List<RuleAtomicParam> userActionSequenceParam = ruleParam.getUserActionSequenceParam();
        if (CollectionUtils.isNotEmpty(userActionSequenceParam)) {
            //规则中的最大步骤
            int totalStep = userActionSequenceParam.size();

            //计算查询分界点：当前时间对小时取整 -1
            Long splitPoint = DateUtils.addHours(DateUtils.ceiling(new Date(), Calendar.HOUR), -2).getTime();

            RuleAtomicParam param = userActionSequenceParam.get(0);
            long rangeStart = param.getRangeStart();
            long rangeEnd = param.getRangeEnd();

            if (rangeStart > splitPoint) {
                SystemPrintUtils.printLog("进入：rangeStart > splitPoint");
                //state中查询
                return userActionSequenceQueryService.queryActionSequence("", ruleParam);
            } else if (rangeEnd < splitPoint) {
                SystemPrintUtils.printLog("进入：rangeEnd < splitPoint");

                //clickhouse中查询
                return userActionSequenceQueryClickHouseService.queryActionSequence(logBean.getDeviceId(), ruleParam);
            } else {
                SystemPrintUtils.printLog("进入分段查询！");
                //分步骤查询
                //1.碰运气，先查State，看是否已满足次序条件
                modifyRangeTime(userActionSequenceParam, splitPoint, rangeEnd);
                boolean b = userActionSequenceQueryService.queryActionSequence("", ruleParam);
                if (b) return true;

                //2.如果1中的不满足，则按照先查ClickHouse再查State的逻辑查询计算
                modifyRangeTime(userActionSequenceParam, rangeStart, splitPoint);
                boolean b1 = userActionSequenceQueryClickHouseService.queryActionSequence(logBean.getDeviceId(), ruleParam);
                if (b1) return true;
                //远期步骤不匹配，记住步骤数，查近期数据
                int farMaxStep = ruleParam.getUserActionSequenceQueriedMaxStep();

                //查近期数据
                modifyRangeTime(userActionSequenceParam, splitPoint, rangeEnd);
                //截段次序数据，从上一步不匹配的位置开始查
                List<RuleAtomicParam> subList = userActionSequenceParam.subList(farMaxStep, userActionSequenceParam.size());
                ruleParam.setUserActionSequenceParam(subList);
                userActionSequenceQueryService.queryActionSequence("", ruleParam);
                int nearMaxStep = ruleParam.getUserActionSequenceQueriedMaxStep();

                //将整合后的最终结果塞回ruleParam
                ruleParam.setUserActionSequenceQueriedMaxStep(ruleParam.getUserActionSequenceQueriedMaxStep() + farMaxStep + nearMaxStep);
                SystemPrintUtils.printLog("==--->farMaxStep:" + farMaxStep + ", nearMaxStep:" + nearMaxStep + ", totalStep:" + totalStep);
                return totalStep == ruleParam.getUserActionSequenceQueriedMaxStep();
            }
        }
        return true;
    }

    private void modifyRangeTime(List<RuleAtomicParam> userActionSequenceParam, long rangeStart, Long rangEnd) {
        for (RuleAtomicParam ruleAtomicParam : userActionSequenceParam) {
            ruleAtomicParam.setRangeStart(rangeStart);
            ruleAtomicParam.setRangeEnd(rangEnd);
        }
    }
}
